package {{packageName}}.sample.infrastructure.primary.kafka.consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import {{packageName}}.UnitTest;

@UnitTest
@ExtendWith(MockitoExtension.class)
class SampleConsumerTest {

  public static final int TIMEOUT_FOR_EXECUTOR = 3000;

  @Mock
  private KafkaConsumer<String, String> consumer;

  private SampleConsumer sampleConsumer;

  @BeforeEach
  void setUp() throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
    sampleConsumer = new SampleConsumer("queue.jhipster.sample", 10_000, consumer);

    Method postConstructInit = SampleConsumer.class.getDeclaredMethod("init");
    postConstructInit.setAccessible(true);
    postConstructInit.invoke(sampleConsumer);

    Method postConstructDestroy = SampleConsumer.class.getDeclaredMethod("destroy");
    postConstructDestroy.setAccessible(true);
    postConstructDestroy.invoke(sampleConsumer);
  }

  @Test
  void shouldHandleMessage() {
    ConsumerRecord<String, String> record = new ConsumerRecord<>("queue.jhipster.sample", 0, 0, null, "sample message");
    boolean actualResult = sampleConsumer.handleMessage(record);
    assertThat(actualResult).isTrue();
  }

  @Test
  void shouldNotHandleMessage() {
    ConsumerRecord<String, String> record = new ConsumerRecord<>("queue.jhipster.sample", 0, 0, null, null);
    boolean actualResult = sampleConsumer.handleMessage(record);
    assertThat(actualResult).isFalse();
  }

  @Test
  void shouldExecuteKafkaRunner() {
    Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
    List<ConsumerRecord<String, String>> recordsList = new ArrayList<>();
    recordsList.add(new ConsumerRecord<>("queue.jhipster.sample", 0, 0, null, "sample message"));
    recordsMap.put(new TopicPartition("queue.jhipster", 0), recordsList);
    ConsumerRecords<String, String> records = new ConsumerRecords(recordsMap);
    when(consumer.poll(Duration.ofMillis(10000))).thenReturn(records);
    sampleConsumer.setClosed(false);

    sampleConsumer.executeKafkaRunner();

    verify(consumer, timeout(TIMEOUT_FOR_EXECUTOR)).subscribe(Collections.singleton("queue.jhipster.sample"));
  }

  @Test
  void shouldExecuteKafkaRunnerNotPolling() {
    sampleConsumer.setClosed(true);

    sampleConsumer.executeKafkaRunner();

    verify(consumer, timeout(TIMEOUT_FOR_EXECUTOR)).subscribe(Collections.singleton("queue.jhipster.sample"));
  }

  @Test
  void shouldThrowsException() {
    sampleConsumer.setClosed(false);
    when(consumer.poll(Duration.ofMillis(10000))).thenThrow(NullPointerException.class);

    sampleConsumer.executeKafkaRunner();

    verify(consumer, timeout(TIMEOUT_FOR_EXECUTOR)).subscribe(Collections.singleton("queue.jhipster.sample"));
  }

  @Test
  void shouldThrowsWakeupException() {
    sampleConsumer.setClosed(false);
    when(consumer.poll(Duration.ofMillis(10000))).thenThrow(WakeupException.class);

    sampleConsumer.executeKafkaRunner();

    verify(consumer, timeout(TIMEOUT_FOR_EXECUTOR)).subscribe(Collections.singleton("queue.jhipster.sample"));
  }

  @Test
  void shouldThrowsWakeupExceptionWithClosing() {
    sampleConsumer.setClosed(true);
    doThrow(WakeupException.class).when(consumer).subscribe(Collections.singleton("queue.jhipster.sample"));

    sampleConsumer.executeKafkaRunner();

    verify(consumer, timeout(TIMEOUT_FOR_EXECUTOR)).subscribe(Collections.singleton("queue.jhipster.sample"));
  }

  @Test
  void shouldSetClosed() {
    sampleConsumer.setClosed(true);
    assertThat(sampleConsumer.isClosed()).isTrue();
  }
}
